【Python】ExcelのTable定義書からRedshift DDLを自動生成
はじめに
こんにちは。データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回は、ExcelのTable定義書からRedshiftのDDLを作成するPython実装を行ってみたいと思います。
前提条件
- 以下リンクのExcelファイルを対象とします。
- Redshiftの対応データ型は以下AWS公式記載の19個を対象とします。
- spectrumやviewのddlには対応していないので、別途回収していただく必要があります。
No.1-19では、それぞれのデータ型のNot NULL制約、デフォルト値の確認用のテストデータ。No.20-38では、PRIMARY KEY制約の確認用のテストデータ。No.39-57では、上記制約を除いた確認用のテストデータとなります。
実装
実装したソースコードはGithubに格納してあります。
フォルダ構成は以下となります。
. ├── README.md ├── input │ └── 正常系table定義.xlsx ├── lib │ ├── environments.py │ ├── excel_processing.py │ ├── sql_processing.py │ └── table_info.py ├── main.py ├── output │ └── dwh.test.sql └── requirements.txt 3 directories, 9 files
まず、必要なライブラリをrequrements.txtからinstallします。
pip install -r requirements.txt
main.py
main.pyでは、他のライブラリを呼び出すmain処理を実装しています。 処理の流れとしては、大きく以下になります。
- 環境変数読み込み
- Excelファイル読み込み
- フォルダ配下のファイル数分繰り返し、pandasを用いてデータを取得。
- table情報からDDLを整形
- sqlファイルとして出力
import glob import warnings import traceback from lib.table_info import TableInfo from lib.environments import get_env from lib.excel_processing import extract_schema_table_name_from_excel, extract_table_data_from_excel from lib.sql_processing import make_column_definition # Excelファイルにデータ検証(Data Validation)の拡張が含まれているけど、 # その拡張は openpyxl ライブラリでサポートされていないという意味です。 # そのため、読み込むExcelファイルがこの拡張を必要としない場合には問題ありません。 warnings.filterwarnings("ignore", message="Data Validation extension is not supported and will be removed") def write_to_file(s_name: str, t_name: str, ddl: str) -> None: """ "SQL文をファイルに書き込み""" with open(f"./output/{s_name}.{t_name}.sql", "w", encoding="utf-8") as file: file.write(ddl) def main(): try: env = get_env() files = [file for file in glob.glob("./input/*.xlsx") if "~$" not in file] if not files: raise ValueError("file did not exist.") print("files:", files) for file in files: schema_name, table_name = extract_schema_table_name_from_excel(file, env) extract_table_data = extract_table_data_from_excel(file, env) column_lengths = extract_table_data[env["COLUMN_NAME"]].apply(lambda x: len(str(x))) max_column_length = max(column_lengths) ddl = f"CREATE TABLE IF NOT EXISTS {schema_name}.{table_name}(\n" table_info = TableInfo(ddl, max_column_length) for _, row in extract_table_data.iterrows(): table_info.column_name = row[env["COLUMN_NAME"]] table_info.data_type = row[env["DATA_TYPE"]] table_info.digits = row[env["DIGITS"]] table_info.decimal_part = row[env["DECIMAL_PART"]] table_info.primary_key = row[env["PRIMARY_KEY"]] table_info.is_not_null = row[env["NOT_NULL"]] table_info.default_value = row[env["DEFAULT_VALUE"]] # 全て含む場合 if { table_info.column_name, table_info.decimal_part, table_info.data_type, table_info.digits, table_info.primary_key, table_info.default_value, } == {"-"}: continue elif table_info.data_type in env["REDSHIFT_DATA_TYPES"].split(","): if table_info.primary_key != "-": table_info.primary_key_list = table_info.column_name make_column_definition(table_info) else: print("error record:", vars(table_info)) raise ValueError("There is no data type. Tracking your self or ask the administorator") if table_info.primary_key_list != []: table_info.ddl += " PRIMARY KEY (" + ", ".join(table_info.primary_key_list) + ")\n" table_info.ddl = table_info.ddl.rstrip(",\n") + "\n)\nDISTSTYLE AUTO\nSORTKEY AUTO;" write_to_file(schema_name, table_name, table_info.ddl) print("successfull") except Exception: print("error file:", file) traceback.print_exc() if __name__ == "__main__": main()
要点だけ確認していきます。 openpyxl ライブラリは、Excelファイルの読み込み時に、「Data Validation extension is not supported and will be removed」という警告を出します。 これは、Excelファイルにデータ検証(Data Validation)の拡張が含まれていますが、その拡張はopenpyxl ライブラリでサポートされていないという意味になります。読み込むExcelファイルがこの拡張を必要としない場合には問題ないため、警告を無視するように実装しています。
warnings.filterwarnings("ignore", message="Data Validation extension is not supported and will be removed")
globを用いて、input配下の拡張子xlsx
のファイルを抽出します。抽出時に~$
というExcelファイルをOpen時に作成されるtmpファイルは除くようにしています。
files = [file for file in glob.glob("./input/*.xlsx") if "~$" not in file]
PandasのDataFrameの特定の列(env["COLUMN_NAME"]
という環境変数で指定されている列)を対象に、その各行の要素(値)の文字列長を計算し、その結果を新たなcolumn_lengthsとして返しています。
具体的には、apply
関数を使って lambda x: len(str(x)) という無名関数(lambda関数)を各行に適用しています。このlambda関数はその引数 x を文字列に変換 str(x) し、その長さ len() を返すものです。
column_lengths = table[env["COLUMN_NAME"]].apply(lambda x: len(str(x)))
全てのtable_infoが-
の場合は、空レコードなのでcontinueし、環境変数で定義した、Redshift データ型にMatchする場合にddlを作成します。Matchしない行がある場合は、例外処理をします。
# 全て含む場合 if { table_info.column_name, table_info.decimal_part, table_info.data_type, table_info.digits, table_info.primary_key, table_info.default_value, } == {"-"}: continue elif table_info.data_type in env["REDSHIFT_DATA_TYPES"].split(","): if table_info.primary_key != "-": table_info.primary_key_list = table_info.column_name make_column_definition(table_info) else: print("error record:", vars(table_info)) raise ValueError("There is no data type. Tracking your self or ask the administorator")
主キーが存在する場合は、主キーの記載を行います。DDLの最後にDISTSTYLE AUTO
とSORTKEY AUTO
を付与していますが、必要に応じて適切な設定を選択することが重要となります。データ分散スタイル(DISTSTYLE)はテーブルのデータがRedshiftクラスタ内のノードにどのように分散されるかを決定します。AUTOを指定すると、Redshiftはテーブルのサイズ(行数やデータ量)を見て、ALL、KEY、EVENの最適なものに自動的に切り替えます。ソートキー(SORTKEY)AUTOを指定すると、Redshiftは実行されるクエリのパターンを見て、一連のソートキーを自動的に調整してテーブルをソートします。そのため、データが増加してもクエリパフォーマンスを保てます。
if table_info.primary_key_list != []: table_info.ddl += " PRIMARY KEY (" + ", ".join(table_info.primary_key_list) + ")\n" table_info.ddl = table_info.ddl.rstrip(",\n") + "\n)\nDISTSTYLE AUTO\nSORTKEY AUTO;"
environments.py
下記ファイルでは、table定義書の環境変数をそれぞれ記載していきます。また、REDSHIFT_DATA_TYPES
にて対応のRedshiftデータ型を定義しています。
import os def get_env(): # Excelシート名 os.environ["SHEET_NAME"] = os.environ.get("SHEET_NAME", "table定義") # カラム名の列名 os.environ["COLUMN_NAME"] = os.environ.get("COLUMN_NAME", "カラム名(物理)") # データ型の列名 os.environ["DATA_TYPE"] = os.environ.get("DATA_TYPE", "データ型") # 桁数の列名 os.environ["DIGITS"] = os.environ.get("DIGITS", "桁数") # 小数点以下桁数の列名 os.environ["DECIMAL_PART"] = os.environ.get("DECIMAL_PART", "小数点以下桁数") # 主キーの列名 os.environ["PRIMARY_KEY"] = os.environ.get("PRIMARY_KEY", "PRIMARY KEY") # Not NULL制約の列名 os.environ["NOT_NULL"] = os.environ.get("NOT_NULL", "NOT NULL") # デフォルト値の列名 os.environ["DEFAULT_VALUE"] = os.environ.get("DEFAULT_VALUE", "デフォルト値") # table nameが格納されている列番号 os.environ["TABLE_NAME_LOCATION"] = os.environ.get("TABLE_NAME_LOCATION", "1") # schema nameが格納されている列番号 os.environ["SCHEMA_NAME_LOCATION"] = os.environ.get("SCHEMA_NAME_LOCATION", "5") os.environ["REDSHIFT_DATA_TYPES"] = ",".join( [ "SMALLINT", "INTEGER", "BIGINT", "DECIMAL", "REAL", "DOUBLE PRECISION", "BOOLEAN", "CHAR", "VARCHAR", "DATE", "TIMESTAMP", "TIMESTAMPTZ", "GEOMETRY", "GEOGRAPHY", "HLLSKETCH", "SUPER", "TIME", "TIMETZ", "VARBYTE", ] ) return os.environ
excel_processing.py
下記ファイルでは、Excelからデータを抽出、加工する処理を実装しています。
import pandas as pd def extract_schema_table_name_from_excel(file: str, env): """Excelファイルからtable_name,schema_nameを抽出""" header_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], nrows=0) schema_name = header_df.columns[int(env["SCHEMA_NAME_LOCATION"])] table_name = header_df.columns[int(env["TABLE_NAME_LOCATION"])] return schema_name, table_name def extract_table_data_from_excel(file: str, env) -> pd.DataFrame: """Excelファイルからデータフレームを抽出""" extract_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], header=2, dtype=str) col_map = { "column_name": env["COLUMN_NAME"], "data_type": env["DATA_TYPE"], "digits": env["DIGITS"], "decimal_part": env["DECIMAL_PART"], "primary_key": env["PRIMARY_KEY"], "not_null": env["NOT_NULL"], "default_value": env["DEFAULT_VALUE"], } extracted_data = extract_df[[col_map[key] for key in col_map if key in col_map]] return clean_table(extracted_data) def clean_table( df: pd.DataFrame, ) -> pd.DataFrame: """データフレームのクリーニング""" # 文字列型のデータの欠損値を"-"に変換します return df.fillna("-")
nrows=0
と指定することで先頭行のみを抽出しています。
header_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], nrows=0)
header=2
と指定し、2行目をheaderとし、dtype=str
とすることで全てstringで抽出するようにしています。
extract_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], header=2, dtype=str)
mapに含まれているカラムのみをdfで使用するようにします。
col_map = { "column_name": env["COLUMN_NAME"], "data_type": env["DATA_TYPE"], "digits": env["DIGITS"], "decimal_part": env["DECIMAL_PART"], "primary_key": env["PRIMARY_KEY"], "not_null": env["NOT_NULL"], "default_value": env["DEFAULT_VALUE"], } extracted_data = extract_df[[col_map[key] for key in col_map if key in col_map]]
sql_processing.py
下記ファイルで、ddl定義を行っています。{file_info.column_name:<{file_info.max_column_length}}
と最初に定義することで、それぞれのカラムごとにフォーマットのばらつきが出ないように調整しています。file_info.digits(文字桁数)
、file_info.decimal_part(小数点以下桁数)
、file_info.is_not_null(Not Null)
, file_info.default_value(デフォルト値)
にそれぞれ値がある場合は、追記するような条件分岐となっています。
def make_column_definition(file_info): sql_line = f" {file_info.column_name:<{file_info.max_column_length}} {file_info.data_type}" # Add digits and decimal part if needed if file_info.digits != "-": if file_info.decimal_part != "-": sql_line += f"({int(file_info.digits)},{file_info.decimal_part})" else: sql_line += f"({int(file_info.digits)})" # Add NOT NULL if needed if file_info.is_not_null != "-": sql_line += " NOT NULL" # Add default value if needed if file_info.default_value != "-": sql_line += f" DEFAULT '{file_info.default_value}'" file_info.ddl += sql_line + ",\n"
table_info.py
下記ファイルでは、データのset,get用のTableInfo
Classを定義しています。
class TableInfo: def __init__(self, ddl, max_column_length): self.__ddl = ddl self.__data_type = "" self.__digits = "" self.__is_not_null = "" self.__column_name = "" self.__max_column_length = max_column_length self.__decimal_part = 0 self.__primary_key = "" self.__default_value = "" self.__primary_key_list = [] # gettter method @property def ddl(self): return self.__ddl @property def data_type(self): return self.__data_type @property def digits(self): return self.__digits @property def is_not_null(self): return self.__is_not_null @property def column_name(self): return self.__column_name @property def max_column_length(self): return self.__max_column_length @property def decimal_part(self): return self.__decimal_part @property def primary_key(self): return self.__primary_key @property def primary_key_list(self): return self.__primary_key_list @property def default_value(self): return self.__default_value # setter method @ddl.setter def ddl(self, ddl): self.__ddl = ddl @data_type.setter def data_type(self, data_type): self.__data_type = data_type @digits.setter def digits(self, digits): self.__digits = digits @is_not_null.setter def is_not_null(self, is_not_null): self.__is_not_null = is_not_null @column_name.setter def column_name(self, column_name): self.__column_name = column_name @max_column_length.setter def max_column_length(self, max_column_length): self.__max_column_length = max_column_length @decimal_part.setter def decimal_part(self, decimal_part): self.__decimal_part = decimal_part @primary_key.setter def primary_key(self, primary_key): self.__primary_key = primary_key @primary_key_list.setter def primary_key_list(self, primary_key_list): self.__primary_key_list.append(primary_key_list) @default_value.setter def default_value(self, default_value): self.__default_value = default_value
実行結果
それではmain.pyを実行してみます。
(ddl_env) kasama.yoshiki@ 30_create_redshift_ddls % python main.py files: ['./input/正常系table定義.xlsx'] successfull (ddl_env) kasama.yoshiki@30_create_redshift_ddls %
outputフォルダに対象のddlが作成されました。以下の観点でも問題ないことを確認できました。
col_column_name_1
:Not NULL
,デフォルト値
、桁数``小数点以下桁数
col_column_name_2
:primary key
col_column_name_3
: 制約なし
CREATE TABLE IF NOT EXISTS dwh.test( col_smallint_1 SMALLINT NOT NULL DEFAULT '0', col_integer_1 INTEGER NOT NULL DEFAULT '0', col_bigint_1 BIGINT NOT NULL DEFAULT '0', col_decimal_1 DECIMAL(5,2) NOT NULL DEFAULT '0.0', col_real_1 REAL NOT NULL DEFAULT '0.0', col_double_1 DOUBLE PRECISION NOT NULL DEFAULT '0.0', col_boolean_1 BOOLEAN NOT NULL DEFAULT 'True', col_char_1 CHAR(10) NOT NULL, col_varchar_1 VARCHAR(255) NOT NULL, col_date_1 DATE NOT NULL DEFAULT '2020-01-01', col_timestamp_1 TIMESTAMP NOT NULL DEFAULT '2000-01-01 00:00:00', col_timestamptz_1 TIMESTAMPTZ NOT NULL DEFAULT '2000-01-01 00:00:00+09', col_geometry_1 GEOMETRY NOT NULL, col_geography_1 GEOGRAPHY NOT NULL, col_hllsketch_1 HLLSKETCH NOT NULL, col_super_1 SUPER NOT NULL DEFAULT '{"key": "value"}', col_time_1 TIME NOT NULL DEFAULT '00:00:00', col_timetz_1 TIMETZ NOT NULL DEFAULT '00:00:00+09', col_varbyte_1 VARBYTE NOT NULL, col_smallint_2 SMALLINT, col_integer_2 INTEGER, col_bigint_2 BIGINT, col_decimal_2 DECIMAL(5,2), col_real_2 REAL, col_double_2 DOUBLE PRECISION, col_boolean_2 BOOLEAN, col_char_2 CHAR(10), col_varchar_2 VARCHAR(255), col_date_2 DATE, col_timestamp_2 TIMESTAMP, col_timestamptz_2 TIMESTAMPTZ, col_geometry_2 GEOMETRY, col_geography_2 GEOGRAPHY, col_hllsketch_2 HLLSKETCH, col_super_2 SUPER, col_time_2 TIME, col_timetz_2 TIMETZ, col_varbyte_2 VARBYTE, col_smallint_3 SMALLINT, col_integer_3 INTEGER, col_bigint_3 BIGINT, col_decimal_3 DECIMAL(5,2), col_real_3 REAL, col_double_3 DOUBLE PRECISION, col_boolean_3 BOOLEAN, col_char_3 CHAR(10), col_varchar_3 VARCHAR(255), col_date_3 DATE, col_timestamp_3 TIMESTAMP, col_timestamptz_3 TIMESTAMPTZ, col_geometry_3 GEOMETRY, col_geography_3 GEOGRAPHY, col_hllsketch_3 HLLSKETCH, col_super_3 SUPER, col_time_3 TIME, col_timetz_3 TIMETZ, col_varbyte_3 VARBYTE, PRIMARY KEY (col_smallint_2, col_integer_2, col_bigint_2, col_decimal_2, col_real_2, col_double_2, col_boolean_2, col_char_2, col_varchar_2, col_date_2, col_timestamp_2, col_timestamptz_2, col_time_2, col_timetz_2, col_varbyte_2) ) DISTSTYLE AUTO SORTKEY AUTO;
このddlをRedshift Serverlessで実行してみたいと思います。
table定義書で指定したdwh
スキーマは今回は作成していないため、schema指定を外して実行しましたが、問題なく成功しました。
table definitionからも正常に作成されていることを確認できました。
最後に
table定義書のExcelファイルはあくまでサンプルですので、それぞれのプロジェクトに合わせた内容にpandas実装、environments.py
などを修正し、役立てていただければと思います。